package com.xy6.model.thread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import com.xy6.model.api.enums.MyStatus;
import com.xy6.model.bean.wrapper.MyJobWrapper;
import com.xy6.model.bean.wrapper.MyTaskWrapper;
import com.xy6.model.executor.MyExecutorFactory;
import com.xy6.model.executor.MyModelExecutor;
import com.xy6.model.service.MyThreadPool;
import com.xy6.model.service.manager.MyJobManager;
import com.xy6.model.service.monitor.MyTaskMonitor;
import com.xy6.model.utils.MyJobUtils;
import com.xy6.model.utils.MySystem;

/**
 * 执行一个任务，实现执行前、执行后自定义处理
 * 
 * @author zhang
 * @since 2018-05-16
 */
public class MyJobExec extends MyAbstractExecTemplate {

	private MyJobWrapper job;
	
	/** 线程是否挂起标识，true挂起，false不挂起 */
	private volatile boolean suspendFlag = false;
	
	private volatile boolean interruptedFlag = false;

	public MyJobExec(MyJobWrapper job) {
		this.job = job;
	}

	@Override
	protected void beforeExec() {
		MySystem.println("MyJobExec.beforeExec");
		MyJobManager.updateStatus(job, MyStatus.BEGIN);
	}

	@Override
	protected void postExec() {
		MySystem.println("MyJobExec.postExecDAG begin");
		boolean result = MyJobUtils.verify(job.getJob());
		if(!result){
			MySystem.println("MyJobExec.postExecDAG end. job is invalid, not exec dag");
			return;
		}
		// 执行DAG中的任务，从第一个开始
		runTask(job.getFirst());
		MySystem.println("MyJobExec.postExecDAG end");
	}
	
	/**
	 * 递归执行DAG中的任务
	 * 
	 * @param task
	 */
	@SuppressWarnings("rawtypes")
	private void runTask(final MyTaskWrapper task){
		MySystem.println(String.format("MyJobExec.runTask begin %s", task.getId()));
		// 检查依赖的task
		boolean preFinished = MyJobUtils.isPreTaskFinished(task.getId(), job);
		if(!preFinished){
			MySystem.println(String.format("MyJobExec.runTask %s pre tasks not finished", task.getId()));
			return;
		} else {
			MySystem.println(String.format("MyJobExec.runTask %s pre tasks finished", task.getId()));
		}
		
		// 暂定任务。在任务开始执行前，检查标识位
		try {
			synchronized (this) {
				MySystem.println(String.format("MyJobExec.runTask suspendFlag %s", suspendFlag));
				while (suspendFlag) {
					wait();
				}
			}
		} catch (InterruptedException e) {
			interruptedFlag = true;
			e.printStackTrace();
		} finally {
			if (interruptedFlag) {
				MySystem.println(String.format("MyJobExec.runTask %s wait() Interrupted1", task.getId()));
				return;
			}
		}
		
		// 执行当前task
		// 解决两个任务同时执行完毕，同时开始执行后续任务，导致后续任务重复执行
		if(MyTaskMonitor.isTaskBegin(job.getId(), task.getId())){
			MySystem.println(String.format("MyJobExec.runTask %s already begin run", task.getId()));
			return;
		}
		MyTaskMonitor.put(job.getId(), task.getId(), MyStatus.BEGIN);
		Future future = MyThreadPool.submit(new Runnable(){
			@Override
			public void run(){
				MySystem.println(String.format("MyJobExec.runTask %s run() begin", task.getId()));
				MyModelExecutor executor = MyExecutorFactory.produce(task.getModelInfo().getType());
				try {
					executor.exec(task);
				} catch (InterruptedException e) {
					interruptedFlag = true;
					e.printStackTrace();
				} finally {
					if(interruptedFlag){
						MyTaskMonitor.put(job.getId(), task.getId(), MyStatus.INTERRUPT);
						MySystem.println(String.format("MyJobExec.runTask %s run() Interrupted1", task.getId()));
						return;
					}
				}
			}
		});
		try {
			// 等待任务执行结束。出现异常，则任务终止
			future.get();
			// 加入监控集合，表示已执行结束
			MyTaskMonitor.put(job.getId(), task.getId(), MyStatus.END);
		} catch (InterruptedException e) {
			interruptedFlag = true;
			e.printStackTrace();
		} catch (ExecutionException e) {
			interruptedFlag = true;
			e.printStackTrace();
		} finally {
			if(interruptedFlag){
				MyTaskMonitor.put(job.getId(), task.getId(), MyStatus.INTERRUPT);
				MySystem.println(String.format("MyJobExec.runTask %s future.get() Interrupted2", task.getId()));
				return;
			}
		}
		MySystem.println(String.format("MyJobExec.runTask %s run() end", task.getId()));
		
		// 执行后续任务，如果有多个，并行执行。无需判断null
		List<String> nextTasks = job.getNextTasks().get(task.getId());
		List<Future> nextFutures = new ArrayList<>(5);
		for(final String nextTaskId : nextTasks){
			MySystem.println(String.format("MyJobExec.runTask submit next %s->%s", task.getId(), nextTaskId));
			Future nextFuture = MyThreadPool.submit(new Runnable(){
				@Override
				public void run(){
					runTask(job.getMapTasks().get(nextTaskId));
				}
			});
			nextFutures.add(nextFuture);
		}
		for(Future f : nextFutures){
			try {
				// 等待任务执行结束，再执行afterExec等操作。出现异常，则任务终止
				f.get();
			} catch (InterruptedException e) {
				interruptedFlag = true;
				e.printStackTrace();
			} catch (ExecutionException e) {
				interruptedFlag = true;
				e.printStackTrace();
			} finally {
				if(interruptedFlag){
					MySystem.println(String.format("MyJobExec.runTask %s nextfuture.get() Interrupted3", task.getId()));
					return;
				}
			}
		}
		MySystem.println(String.format("MyJobExec.runTask end %s", task.getId()));
	}
	
	
	@Override
	protected void afterExec() {
		MySystem.println("MyJobExec.afterExec");
		if(interruptedFlag){
			MyJobManager.updateStatus(job, MyStatus.INTERRUPT);
		} else {
			MyJobManager.updateStatus(job, MyStatus.END);
		}
		MySystem.println(MyTaskMonitor.getJobStatus(job.getId()).toString());
	}
	
	/**
	 * 修改标识位，挂起当前线程
	 */
	@Override
	public void mysuspend(){
		suspendFlag = true;
		MySystem.println("MyJobExec.mysuspend()");
	}
	
	/**
	 * 恢复线程，继续执行
	 */
	@Override
	public void myresume(){
		synchronized(this) {
			suspendFlag = false;
			notifyAll();
			MySystem.println("MyJobExec.myresume()");
		}
	}
	
	/**
	 * 修改标识位，停止所有任务
	 */
	@Override
	public void mystop(){
		interruptedFlag = true;
		MySystem.println("MyJobExec.mystop()");
		if(!suspendFlag){
			return;
		}
		// 当前处于挂起状态
		synchronized(this) {
			suspendFlag = false;
			notifyAll();
		}
	}
	
}
